package cn.inspur.spark

import java.util.Properties
import org.apache.spark.sql.{SaveMode, SparkSession}


case class List(ProductID: String, ProductName: String, A: String, Price: String, CommentNum: String, StoreName: String, ZiYing: String)

case class Detail(ProduceName: String, ProductID: String, ProductWeight: String, ProducePlace: String, CPU: String, RAM: String, ROM: String,
                  MemoryCard: String, CameraNum: String, BackCameraElemnt: String, FrontCameraElement: String, ScreenSize: String, ResolutionRatio: String,
                  ScreenRatio: String, Charge: String,Hot: String, OS: String,GameSet :String, Brand: String)

case class Comment(ID: String, ProductID: String, GUID: String, Comment: String, CommentTime: String, ConsultID: String, ConsultTime: String,
                   Score: String, UserName: String, UserGrade: String, ISPhone: String, PurchasePlatform: String)


object DataProcessing {
  def main(args: Array[String]): Unit = {
    UserGradeNum()
    SalesVolume()
    OSRate()
  }

  // 不同等级用户评论数
  def UserGradeNum(): Unit = {
    val sess = SparkSession.builder().appName("UserGradeNum").master("local").getOrCreate()
    import sess.implicits._
    val df = sess.read.textFile("hdfs://nd11:9000/e-commerce/cleared/comment/part-00000")
      .map(_.split("\001"))
      .map(arr => Comment(arr(0), arr(1), arr(2), arr(3), arr(4), arr(5), arr(6), arr(7), arr(8), arr(9), arr(10), arr(11)))
    df.createOrReplaceTempView("Comment")

    // 设置MySQL
    val url = "jdbc:mysql://cdb-rov6dd1k.bj.tencentcdb.com:10181/JD"
    val user = "root"
    val pwd = ""  //密码已删除
    val driver = "com.mysql.cj.jdbc.Driver"
    val prop = new Properties
    prop.setProperty("user", user)
    prop.setProperty("password", pwd)
    prop.setProperty("driver", driver)
    prop.setProperty("url", url)

    // 利用SQL语句查询，不同等级用户评论数
    sess.sql("select UserGrade,count(ID) as cnt from (select distinct * from Comment) group by UserGrade")
      //.show()
      .write
      .mode(SaveMode.Overwrite)
      .jdbc(url,"UserGradeNum",prop)

  }

  // 取不同品牌手机销量,取评论里的热词,统计手机品牌的买家评分
  def SalesVolume(): Unit = {
    val sess = SparkSession.builder().appName("UserGradeNum").master("local").getOrCreate()
    import sess.implicits._
    // 设置MySQL
    val url = "jdbc:mysql://cdb-rov6dd1k.bj.tencentcdb.com:10181/JD"
    val user = "root"
    val pwd = ""  //密码已删除
    val driver = "com.mysql.cj.jdbc.Driver"
    val prop = new Properties
    prop.setProperty("user", user)
    prop.setProperty("password", pwd)
    prop.setProperty("driver", driver)
    prop.setProperty("url", url)

    // 评论表
    val df1 = sess.read.textFile("hdfs://nd11:9000/e-commerce/cleared/comment/part-00000")
      .map(_.split("\001"))
      .map(arr => Comment(arr(0), arr(1), arr(2), arr(3), arr(4), arr(5), arr(6), arr(7), arr(8), arr(9), arr(10), arr(11)))
    df1.createOrReplaceTempView("Comment")
    sess.sql("select distinct (*) from Comment")
      .createOrReplaceTempView("c")

    //细节表
    val df2 = sess.read.textFile("hdfs://nd11:9000/e-commerce/cleared/detail/part-00000")
      .map(_.split("\001"))
      .map(arr => Detail(arr(0), arr(1), arr(2), arr(3), arr(4), arr(5), arr(6), arr(7), arr(8), arr(9), arr(10), arr(11), arr(12), arr(13), arr(14), arr(15), arr(16), arr(17), arr(18)))
    df2.createOrReplaceTempView("Detail")
    sess.sql("select distinct * from Detail")
      .createOrReplaceTempView("d")


    // 统计手机品牌的买家评分
    sess.sql("select d.Brand,c.Score,count(1) as cnt from c join d on c.ProductID = d.ProductID group by d.Brand,c.Score")
      .write
      .mode(SaveMode.Overwrite)
      .jdbc(url,"BrandScore",prop)


    // 取不同品牌手机销量
    sess.sql("select d.Brand,count(c.ID) as cnt from d,c where d.ProductID = c.ProductID group by d.Brand order by count(c.ID) desc")
      //.show(100)
      .write
      .mode(SaveMode.Overwrite)
      .jdbc(url,"BrandSalesVolume",prop)

    // 取评论里的热词.结巴分词
    sess.sql("select d.Brand,concat_ws('，',collect_set(c.Comment)) from d left join c on d.ProductID=c.ProductID group by d.Brand")
      .rdd
      .map(lst => {
        val sentence = lst.get(1).toString
        val str = jb.analysis(lst.get(0).toString, sentence)
        str
      })
      .flatMap(_.split(","))
      .map((_, 1))
      .reduceByKey(_ + _)
      .map(t => {
        val s = t._1.toString.split("\\|")
        if (s.length == 2)
          (s(0), s(1), t._2)
        else if (s.length == 1)
          (s(0), "", t._2)
        else
          ("", "", t._2)
      })
      .repartition(1)
      .toDF("Brand","Word","Cnt")
      .write
      .mode(SaveMode.Overwrite)
      .jdbc(url, "FenCi", prop)
  }

  // 统计手机不同操作系统的占有率
  def OSRate():Unit={
    // 设置MySQL
    val url = "jdbc:mysql://cdb-rov6dd1k.bj.tencentcdb.com:10181/JD"
    val user = "root"
    val pwd = ""  //密码已删除
    val driver = "com.mysql.cj.jdbc.Driver"
    val prop = new Properties
    prop.setProperty("user", user)
    prop.setProperty("password", pwd)
    prop.setProperty("driver", driver)
    prop.setProperty("url", url)

    // 细节表
    val sess = SparkSession.builder().appName("UserGradeNum").master("local").getOrCreate()
    import sess.implicits._
    val df2 = sess.read.textFile("hdfs://nd11:9000/e-commerce/cleared/detail/part-00000")
      .map(_.split("\001"))
      .map(arr => Detail(arr(0), arr(1), arr(2), arr(3), arr(4), arr(5), arr(6), arr(7), arr(8), arr(9), arr(10), arr(11), arr(12), arr(13), arr(14), arr(15), arr(16), arr(17), arr(18)))
    df2.createOrReplaceTempView("Detail")
    sess.sql("select distinct * from Detail")
      .createOrReplaceTempView("d")

    // 统计手机不同操作系统的占有率
    sess.sql("select OS,count(1) as cnt from d group by OS")
      .write
      .mode(SaveMode.Overwrite)
      .jdbc(url,"OSRate",prop)
  }

}

